use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use futures::future::{self, Either};
use itertools::Itertools;
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::models::{LocationConfig, LocationConfigMode, TenantConfig};
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, instrument};
use utils::generation::Generation;
use utils::id::NodeId;
use utils::seqwait::{SeqWait, SeqWaitError};
use utils::shard::ShardCount;
use utils::sync::gate::GateGuard;

use crate::compute_hook::ComputeHook;
use crate::metrics::{
    self, ReconcileCompleteLabelGroup, ReconcileLongRunningLabelGroup, ReconcileOutcome,
};
use crate::node::Node;
use crate::persistence::split_state::SplitState;
use crate::persistence::{Persistence, TenantShardPersistence};
use crate::reconciler::{
    ReconcileError, ReconcileUnits, Reconciler, ReconcilerConfig, TargetState,
    attached_location_conf, secondary_location_conf,
};
use crate::scheduler::{
    AffinityScore, AttachedShardTag, NodeSchedulingScore, NodeSecondarySchedulingScore,
    RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
};
use crate::service::ReconcileResultRequest;
use crate::timeline_import::TimelineImportState;
use crate::{Sequence, service};

/// Serialization helper
fn read_last_error<S, T>(v: &std::sync::Mutex<Option<T>>, serializer: S) -> Result<S::Ok, S::Error>
where
    S: serde::ser::Serializer,
    T: std::fmt::Display,
{
    serializer.collect_str(
        &v.lock()
            .unwrap()
            .as_ref()
            .map(|e| format!("{e}"))
            .unwrap_or("".to_string()),
    )
}

/// In-memory state for a particular tenant shard.
///
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
#[derive(Serialize)]
pub(crate) struct TenantShard {
    pub(crate) tenant_shard_id: TenantShardId,

    pub(crate) shard: ShardIdentity,

    // Runtime only: sequence used to coordinate when updating this object while
    // with background reconcilers may be running.  A reconciler runs to a particular
    // sequence.
    pub(crate) sequence: Sequence,

    // Latest generation number: next time we attach, increment this
    // and use the incremented number when attaching.
    //
    // None represents an incompletely onboarded tenant via the [`Service::location_config`]
    // API, where this tenant may only run in PlacementPolicy::Secondary.
    pub(crate) generation: Option<Generation>,

    // High level description of how the tenant should be set up.  Provided
    // externally.
    pub(crate) policy: PlacementPolicy,

    // Low level description of exactly which pageservers should fulfil
    // which role.  Generated by `Self::schedule`.
    pub(crate) intent: IntentState,

    // Low level description of how the tenant is configured on pageservers:
    // if this does not match `Self::intent` then the tenant needs reconciliation
    // with `Self::reconcile`.
    pub(crate) observed: ObservedState,

    // Tenant configuration, passed through opaquely to the pageserver.  Identical
    // for all shards in a tenant.
    pub(crate) config: TenantConfig,

    /// If a reconcile task is currently in flight, it may be joined here (it is
    /// only safe to join if either the result has been received or the reconciler's
    /// cancellation token has been fired)
    #[serde(skip)]
    pub(crate) reconciler: Option<ReconcilerHandle>,

    /// If a tenant is being split, then all shards with that TenantId will have a
    /// SplitState set, this acts as a guard against other operations such as background
    /// reconciliation, and timeline creation.
    pub(crate) splitting: SplitState,

    /// Flag indicating whether the tenant has an in-progress timeline import.
    /// Used to disallow shard splits while an import is in progress.
    pub(crate) importing: TimelineImportState,

    /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
    /// is set. This flag is cleared when the tenant is popped off the delay queue.
    pub(crate) delayed_reconcile: bool,

    /// Optionally wait for reconciliation to complete up to a particular
    /// sequence number.
    #[serde(skip)]
    pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,

    /// Indicates sequence number for which we have encountered an error reconciling.  If
    /// this advances ahead of [`Self::waiter`] then a reconciliation error has occurred,
    /// and callers should stop waiting for `waiter` and propagate the error.
    #[serde(skip)]
    pub(crate) error_waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,

    /// The most recent error from a reconcile on this tenant.  This is a nested Arc
    /// because:
    ///  - ReconcileWaiters need to Arc-clone the overall object to read it later
    ///  - ReconcileWaitError needs to use an `Arc<ReconcileError>` because we can construct
    ///    many waiters for one shard, and the underlying error types are not Clone.
    ///
    /// TODO: generalize to an array of recent events
    /// TOOD: use a ArcSwap instead of mutex for faster reads?
    #[serde(serialize_with = "read_last_error")]
    pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,

    /// Amount of consecutive [`crate::service::Service::reconcile_all`] iterations that have been
    /// scheduled a reconciliation for this shard.
    ///
    /// If this reaches `MAX_CONSECUTIVE_RECONCILES`, the shard is considered "stuck" and will be
    /// ignored when deciding whether optimizations can run. This includes both successful and failed
    /// reconciliations.
    ///
    /// Incremented in [`crate::service::Service::process_result`], and reset to 0 when
    /// [`crate::service::Service::reconcile_all`] determines no reconciliation is needed for this shard.
    pub(crate) consecutive_reconciles_count: usize,

    /// If we have a pending compute notification that for some reason we weren't able to send,
    /// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
    /// and trigger a Reconciler run.  This is the mechanism by which compute notifications are included in the scope
    /// of state that we publish externally in an eventually consistent way.
    pub(crate) pending_compute_notification: bool,

    /// To do a graceful migration, set this field to the destination pageserver, and optimization
    /// functions will consider this node the best location and react appropriately.
    preferred_node: Option<NodeId>,

    // Support/debug tool: if something is going wrong or flapping with scheduling, this may
    // be set to a non-active state to avoid making changes while the issue is fixed.
    scheduling_policy: ShardSchedulingPolicy,
}

#[derive(Clone, Debug, Serialize)]
pub(crate) struct IntentState {
    attached: Option<NodeId>,
    secondary: Vec<NodeId>,

    // We should attempt to schedule this shard in the provided AZ to
    // decrease chances of cross-AZ compute.
    preferred_az_id: Option<AvailabilityZone>,
}

impl IntentState {
    pub(crate) fn new(preferred_az_id: Option<AvailabilityZone>) -> Self {
        Self {
            attached: None,
            secondary: vec![],
            preferred_az_id,
        }
    }
    pub(crate) fn single(
        scheduler: &mut Scheduler,
        node_id: Option<NodeId>,
        preferred_az_id: Option<AvailabilityZone>,
    ) -> Self {
        if let Some(node_id) = node_id {
            scheduler.update_node_ref_counts(
                node_id,
                preferred_az_id.as_ref(),
                RefCountUpdate::Attach,
            );
        }
        Self {
            attached: node_id,
            secondary: vec![],
            preferred_az_id,
        }
    }

    pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
        if self.attached != new_attached {
            if let Some(old_attached) = self.attached.take() {
                scheduler.update_node_ref_counts(
                    old_attached,
                    self.preferred_az_id.as_ref(),
                    RefCountUpdate::Detach,
                );
            }
            if let Some(new_attached) = &new_attached {
                scheduler.update_node_ref_counts(
                    *new_attached,
                    self.preferred_az_id.as_ref(),
                    RefCountUpdate::Attach,
                );
            }
            self.attached = new_attached;
        }

        if let Some(new_attached) = &new_attached {
            assert!(!self.secondary.contains(new_attached));
        }
    }

    /// Like set_attached, but the node is from [`Self::secondary`].  This swaps the node from
    /// secondary to attached while maintaining the scheduler's reference counts.
    pub(crate) fn promote_attached(
        &mut self,
        scheduler: &mut Scheduler,
        promote_secondary: NodeId,
    ) {
        // If we call this with a node that isn't in secondary, it would cause incorrect
        // scheduler reference counting, since we assume the node is already referenced as a secondary.
        debug_assert!(self.secondary.contains(&promote_secondary));

        self.secondary.retain(|n| n != &promote_secondary);

        let demoted = self.attached;
        self.attached = Some(promote_secondary);

        scheduler.update_node_ref_counts(
            promote_secondary,
            self.preferred_az_id.as_ref(),
            RefCountUpdate::PromoteSecondary,
        );
        if let Some(demoted) = demoted {
            scheduler.update_node_ref_counts(
                demoted,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::DemoteAttached,
            );
        }
    }

    pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
        // Every assertion here should probably have a corresponding check in
        // `validate_optimization` unless it is an invariant that should never be violated. Note
        // that the lock is not held between planning optimizations and applying them so you have to
        // assume any valid state transition of the intent state may have occurred
        assert!(!self.secondary.contains(&new_secondary));
        assert!(self.attached != Some(new_secondary));
        scheduler.update_node_ref_counts(
            new_secondary,
            self.preferred_az_id.as_ref(),
            RefCountUpdate::AddSecondary,
        );
        self.secondary.push(new_secondary);
    }

    /// It is legal to call this with a node that is not currently a secondary: that is a no-op
    pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
        let index = self.secondary.iter().position(|n| *n == node_id);
        if let Some(index) = index {
            scheduler.update_node_ref_counts(
                node_id,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::RemoveSecondary,
            );
            self.secondary.remove(index);
        }
    }

    pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
        for secondary in self.secondary.drain(..) {
            scheduler.update_node_ref_counts(
                secondary,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::RemoveSecondary,
            );
        }
    }

    /// Remove the last secondary node from the list of secondaries
    pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
        if let Some(node_id) = self.secondary.pop() {
            scheduler.update_node_ref_counts(
                node_id,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::RemoveSecondary,
            );
        }
    }

    pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
        if let Some(old_attached) = self.attached.take() {
            scheduler.update_node_ref_counts(
                old_attached,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::Detach,
            );
        }

        self.clear_secondary(scheduler);
    }

    pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
        let mut result = Vec::new();
        if let Some(p) = self.attached {
            result.push(p)
        }

        result.extend(self.secondary.iter().copied());

        result
    }

    pub(crate) fn get_attached(&self) -> &Option<NodeId> {
        &self.attached
    }

    pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
        &self.secondary
    }

    /// If the node is in use as the attached location, demote it into
    /// the list of secondary locations.  This is used when a node goes offline,
    /// and we want to use a different node for attachment, but not permanently
    /// forget the location on the offline node.
    ///
    /// Returns true if a change was made
    pub(crate) fn demote_attached(&mut self, scheduler: &mut Scheduler, node_id: NodeId) -> bool {
        if self.attached == Some(node_id) {
            self.attached = None;
            self.secondary.push(node_id);
            scheduler.update_node_ref_counts(
                node_id,
                self.preferred_az_id.as_ref(),
                RefCountUpdate::DemoteAttached,
            );
            true
        } else {
            false
        }
    }

    pub(crate) fn set_preferred_az(
        &mut self,
        scheduler: &mut Scheduler,
        preferred_az: Option<AvailabilityZone>,
    ) {
        let new_az = preferred_az.as_ref();
        let old_az = self.preferred_az_id.as_ref();

        if old_az != new_az {
            if let Some(node_id) = self.attached {
                scheduler.update_node_ref_counts(
                    node_id,
                    new_az,
                    RefCountUpdate::ChangePreferredAzFrom(old_az),
                );
            }
            for node_id in &self.secondary {
                scheduler.update_node_ref_counts(
                    *node_id,
                    new_az,
                    RefCountUpdate::ChangePreferredAzFrom(old_az),
                );
            }
            self.preferred_az_id = preferred_az;
        }
    }

    pub(crate) fn get_preferred_az(&self) -> Option<&AvailabilityZone> {
        self.preferred_az_id.as_ref()
    }
}

impl Drop for IntentState {
    fn drop(&mut self) {
        // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
        // We do not check this while panicking, to avoid polluting unit test failures or
        // other assertions with this assertion's output.  It's still wrong to leak these,
        // but if we already have a panic then we don't need to independently flag this case.
        if !(std::thread::panicking()) {
            debug_assert!(self.attached.is_none() && self.secondary.is_empty());
        }
    }
}

#[derive(Default, Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedState {
    pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
}

/// Our latest knowledge of how this tenant is configured in the outside world.
///
/// Meaning:
///     * No instance of this type exists for a node: we are certain that we have nothing configured on that
///       node for this shard.
///     * Instance exists with conf==None: we *might* have some state on that node, but we don't know
///       what it is (e.g. we failed partway through configuring it)
///     * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
///       and that configuration will still be present unless something external interfered.
#[derive(Clone, Serialize, Deserialize, Debug)]
pub(crate) struct ObservedStateLocation {
    /// If None, it means we do not know the status of this shard's location on this node, but
    /// we know that we might have some state on this node.
    pub(crate) conf: Option<LocationConfig>,
}

pub(crate) struct ReconcilerWaiter {
    // For observability purposes, remember the ID of the shard we're
    // waiting for.
    pub(crate) tenant_shard_id: TenantShardId,

    seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
    error_seq_wait: std::sync::Arc<SeqWait<Sequence, Sequence>>,
    error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
    seq: Sequence,
}

pub(crate) enum ReconcilerStatus {
    Done,
    Failed,
    InProgress,
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileWaitError {
    #[error("Timeout waiting for shard {0}")]
    Timeout(TenantShardId),
    #[error("shutting down")]
    Shutdown,
    #[error("Reconcile error on shard {0}: {1}")]
    Failed(TenantShardId, Arc<ReconcileError>),
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct ReplaceSecondary {
    old_node_id: NodeId,
    new_node_id: NodeId,
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct MigrateAttachment {
    pub(crate) old_attached_node_id: NodeId,
    pub(crate) new_attached_node_id: NodeId,
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) enum ScheduleOptimizationAction {
    // Replace one of our secondary locations with a different node
    ReplaceSecondary(ReplaceSecondary),
    // Migrate attachment to an existing secondary location
    MigrateAttachment(MigrateAttachment),
    // Create a secondary location, with the intent of later migrating to it
    CreateSecondary(NodeId),
    // Remove a secondary location that we previously created to facilitate a migration
    RemoveSecondary(NodeId),
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub(crate) struct ScheduleOptimization {
    // What was the reconcile sequence when we generated this optimization?  The optimization
    // should only be applied if the shard's sequence is still at this value, in case other changes
    // happened between planning the optimization and applying it.
    sequence: Sequence,

    pub(crate) action: ScheduleOptimizationAction,
}

impl ReconcilerWaiter {
    pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
        tokio::select! {
            result = self.seq_wait.wait_for_timeout(self.seq, timeout)=> {
                result.map_err(|e| match e {
                    SeqWaitError::Timeout => ReconcileWaitError::Timeout(self.tenant_shard_id),
                    SeqWaitError::Shutdown => ReconcileWaitError::Shutdown
                })?;
            },
            result = self.error_seq_wait.wait_for(self.seq) => {
                result.map_err(|e| match e {
                    SeqWaitError::Shutdown => ReconcileWaitError::Shutdown,
                    SeqWaitError::Timeout => unreachable!()
                })?;

                return Err(ReconcileWaitError::Failed(self.tenant_shard_id,
                    self.error.lock().unwrap().clone().expect("If error_seq_wait was advanced error was set").clone()))
            }
        }

        Ok(())
    }

    pub(crate) fn get_status(&self) -> ReconcilerStatus {
        if self.seq_wait.would_wait_for(self.seq).is_ok() {
            ReconcilerStatus::Done
        } else if self.error_seq_wait.would_wait_for(self.seq).is_ok() {
            ReconcilerStatus::Failed
        } else {
            ReconcilerStatus::InProgress
        }
    }
}

/// Having spawned a reconciler task, the tenant shard's state will carry enough
/// information to optionally cancel & await it later.
pub(crate) struct ReconcilerHandle {
    sequence: Sequence,
    handle: JoinHandle<()>,
    cancel: CancellationToken,
}

pub(crate) enum ReconcileNeeded {
    /// shard either doesn't need reconciliation, or is forbidden from spawning a reconciler
    /// in its current state (e.g. shard split in progress, or ShardSchedulingPolicy forbids it)
    No,
    /// shard has a reconciler running, and its intent hasn't changed since that one was
    /// spawned: wait for the existing reconciler rather than spawning a new one.
    WaitExisting(ReconcilerWaiter),
    /// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
    Yes(ReconcileReason),
}

#[derive(Debug)]
pub(crate) enum ReconcileReason {
    ActiveNodesDirty,
    UnknownLocation,
    PendingComputeNotification,
}

/// Pending modification to the observed state of a tenant shard.
/// Produced by [`Reconciler::observed_deltas`] and applied in [`crate::service::Service::process_result`].
pub(crate) enum ObservedStateDelta {
    Upsert(Box<(NodeId, ObservedStateLocation)>),
    Delete(NodeId),
}

impl ObservedStateDelta {
    pub(crate) fn node_id(&self) -> &NodeId {
        match self {
            Self::Upsert(up) => &up.0,
            Self::Delete(nid) => nid,
        }
    }
}

/// When a reconcile task completes, it sends this result object
/// to be applied to the primary TenantShard.
pub(crate) struct ReconcileResult {
    pub(crate) sequence: Sequence,
    /// On errors, `observed` should be treated as an incompleted description
    /// of state (i.e. any nodes present in the result should override nodes
    /// present in the parent tenant state, but any unmentioned nodes should
    /// not be removed from parent tenant state)
    pub(crate) result: Result<(), ReconcileError>,

    pub(crate) tenant_shard_id: TenantShardId,
    pub(crate) generation: Option<Generation>,
    pub(crate) observed_deltas: Vec<ObservedStateDelta>,

    /// Set [`TenantShard::pending_compute_notification`] from this flag
    pub(crate) pending_compute_notification: bool,
}

impl ObservedState {
    pub(crate) fn new() -> Self {
        Self {
            locations: HashMap::new(),
        }
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.locations.is_empty()
    }
}

impl TenantShard {
    pub(crate) fn new(
        tenant_shard_id: TenantShardId,
        shard: ShardIdentity,
        policy: PlacementPolicy,
        preferred_az_id: Option<AvailabilityZone>,
    ) -> Self {
        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_tenant_shards
            .inc();

        Self {
            tenant_shard_id,
            policy,
            intent: IntentState::new(preferred_az_id),
            generation: Some(Generation::new(0)),
            shard,
            observed: ObservedState::default(),
            config: TenantConfig::default(),
            reconciler: None,
            splitting: SplitState::Idle,
            importing: TimelineImportState::Idle,
            sequence: Sequence(1),
            delayed_reconcile: false,
            waiter: Arc::new(SeqWait::new(Sequence(0))),
            error_waiter: Arc::new(SeqWait::new(Sequence(0))),
            last_error: Arc::default(),
            consecutive_reconciles_count: 0,
            pending_compute_notification: false,
            scheduling_policy: ShardSchedulingPolicy::default(),
            preferred_node: None,
        }
    }

    /// For use on startup when learning state from pageservers: generate my [`IntentState`] from my
    /// [`ObservedState`], even if it violates my [`PlacementPolicy`].  Call [`Self::schedule`] next,
    /// to get an intent state that complies with placement policy.  The overall goal is to do scheduling
    /// in a way that makes use of any configured locations that already exist in the outside world.
    pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
        // Choose an attached location by filtering observed locations, and then sorting to get the highest
        // generation
        let mut attached_locs = self
            .observed
            .locations
            .iter()
            .filter_map(|(node_id, l)| {
                if let Some(conf) = &l.conf {
                    if conf.mode == LocationConfigMode::AttachedMulti
                        || conf.mode == LocationConfigMode::AttachedSingle
                        || conf.mode == LocationConfigMode::AttachedStale
                    {
                        Some((node_id, conf.generation))
                    } else {
                        None
                    }
                } else {
                    None
                }
            })
            .collect::<Vec<_>>();

        attached_locs.sort_by_key(|i| i.1);
        if let Some((node_id, _gen)) = attached_locs.into_iter().next_back() {
            self.intent.set_attached(scheduler, Some(*node_id));
        }

        // All remaining observed locations generate secondary intents.  This includes None
        // observations, as these may well have some local content on disk that is usable (this
        // is an edge case that might occur if we restarted during a migration or other change)
        //
        // We may leave intent.attached empty if we didn't find any attached locations: [`Self::schedule`]
        // will take care of promoting one of these secondaries to be attached.
        self.observed.locations.keys().for_each(|node_id| {
            if Some(*node_id) != self.intent.attached {
                self.intent.push_secondary(scheduler, *node_id);
            }
        });
    }

    /// Part of [`Self::schedule`] that is used to choose exactly one node to act as the
    /// attached pageserver for a shard.
    ///
    /// Returns whether we modified it, and the NodeId selected.
    fn schedule_attached(
        &mut self,
        scheduler: &mut Scheduler,
        context: &ScheduleContext,
    ) -> Result<(bool, NodeId), ScheduleError> {
        // No work to do if we already have an attached tenant
        if let Some(node_id) = self.intent.attached {
            return Ok((false, node_id));
        }

        if let Some(promote_secondary) = self.preferred_secondary(scheduler) {
            // Promote a secondary
            tracing::debug!("Promoted secondary {} to attached", promote_secondary);
            self.intent.promote_attached(scheduler, promote_secondary);
            Ok((true, promote_secondary))
        } else {
            // Pick a fresh node: either we had no secondaries or none were schedulable
            let node_id = scheduler.schedule_shard::<AttachedShardTag>(
                &self.intent.secondary,
                &self.intent.preferred_az_id,
                context,
            )?;
            tracing::debug!("Selected {} as attached", node_id);
            self.intent.set_attached(scheduler, Some(node_id));
            Ok((true, node_id))
        }
    }

    #[instrument(skip_all, fields(
        tenant_id=%self.tenant_shard_id.tenant_id,
        shard_id=%self.tenant_shard_id.shard_slug(),
        sequence=%self.sequence
    ))]
    pub(crate) fn schedule(
        &mut self,
        scheduler: &mut Scheduler,
        context: &mut ScheduleContext,
    ) -> Result<(), ScheduleError> {
        let r = self.do_schedule(scheduler, context);

        context.avoid(&self.intent.all_pageservers());

        r
    }

    pub(crate) fn do_schedule(
        &mut self,
        scheduler: &mut Scheduler,
        context: &ScheduleContext,
    ) -> Result<(), ScheduleError> {
        // TODO: before scheduling new nodes, check if any existing content in
        // self.intent refers to pageservers that are offline, and pick other
        // pageservers if so.

        // TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
        // change their attach location.

        match self.scheduling_policy {
            ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
            ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
                // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
                tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
                    "Scheduling is disabled by policy {:?}", self.scheduling_policy);
                return Ok(());
            }
        }

        // Build the set of pageservers already in use by this tenant, to avoid scheduling
        // more work on the same pageservers we're already using.
        let mut modified = false;

        // Add/remove nodes to fulfil policy
        use PlacementPolicy::*;
        match self.policy {
            Attached(secondary_count) => {
                // Should have exactly one attached, and at least N secondaries
                let (modified_attached, attached_node_id) =
                    self.schedule_attached(scheduler, context)?;
                modified |= modified_attached;

                let mut used_pageservers = vec![attached_node_id];
                while self.intent.secondary.len() < secondary_count {
                    let node_id = scheduler.schedule_shard::<SecondaryShardTag>(
                        &used_pageservers,
                        &self.intent.preferred_az_id,
                        context,
                    )?;
                    self.intent.push_secondary(scheduler, node_id);
                    used_pageservers.push(node_id);
                    modified = true;
                }
            }
            Secondary => {
                if let Some(node_id) = self.intent.get_attached() {
                    // Populate secondary by demoting the attached node
                    self.intent.demote_attached(scheduler, *node_id);

                    modified = true;
                } else if self.intent.secondary.is_empty() {
                    // Populate secondary by scheduling a fresh node
                    //
                    // We use [`AttachedShardTag`] because when a secondary location is the only one
                    // a shard has, we expect that its next use will be as an attached location: we want
                    // the tenant to be ready to warm up and run fast in their preferred AZ.
                    let node_id = scheduler.schedule_shard::<AttachedShardTag>(
                        &[],
                        &self.intent.preferred_az_id,
                        context,
                    )?;
                    self.intent.push_secondary(scheduler, node_id);
                    modified = true;
                }
                while self.intent.secondary.len() > 1 {
                    // If we have multiple secondaries (e.g. when transitioning from Attached to Secondary and
                    // having just demoted our attached location), then we should prefer to keep the location
                    // in our preferred AZ.  Tenants in Secondary mode want to be in the preferred AZ so that
                    // they have a warm location to become attached when transitioning back into Attached.

                    let mut candidates = self.intent.get_secondary().clone();
                    // Sort to get secondaries outside preferred AZ last
                    candidates
                        .sort_by_key(|n| scheduler.get_node_az(n).as_ref() != self.preferred_az());
                    let secondary_to_remove = candidates.pop().unwrap();
                    self.intent.remove_secondary(scheduler, secondary_to_remove);
                    modified = true;
                }
            }
            Detached => {
                // Never add locations in this mode
                if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
                    self.intent.clear(scheduler);
                    modified = true;
                }
            }
        }

        if modified {
            self.sequence.0 += 1;
        }

        Ok(())
    }

    /// Reschedule this tenant shard to one of its secondary locations. Returns a scheduling error
    /// if the swap is not possible and leaves the intent state in its original state.
    ///
    /// Arguments:
    /// `promote_to`: an optional secondary location of this tenant shard. If set to None, we ask
    /// the scheduler to recommend a node
    pub(crate) fn reschedule_to_secondary(
        &mut self,
        promote_to: Option<NodeId>,
        scheduler: &mut Scheduler,
    ) -> Result<(), ScheduleError> {
        let promote_to = match promote_to {
            Some(node) => node,
            None => match self.preferred_secondary(scheduler) {
                Some(node) => node,
                None => {
                    return Err(ScheduleError::ImpossibleConstraint);
                }
            },
        };

        assert!(self.intent.get_secondary().contains(&promote_to));

        if let Some(node) = self.intent.get_attached() {
            let demoted = self.intent.demote_attached(scheduler, *node);
            if !demoted {
                return Err(ScheduleError::ImpossibleConstraint);
            }
        }

        self.intent.promote_attached(scheduler, promote_to);

        // Increment the sequence number for the edge case where a
        // reconciler is already running to avoid waiting on the
        // current reconcile instead of spawning a new one.
        self.sequence = self.sequence.next();

        Ok(())
    }

    /// Returns None if the current location's score is unavailable, i.e. cannot draw a conclusion
    fn is_better_location<T: ShardTag>(
        &self,
        scheduler: &mut Scheduler,
        schedule_context: &ScheduleContext,
        current: NodeId,
        candidate: NodeId,
    ) -> Option<bool> {
        let Some(candidate_score) = scheduler.compute_node_score::<T::Score>(
            candidate,
            &self.intent.preferred_az_id,
            schedule_context,
        ) else {
            // The candidate node is unavailable for scheduling or otherwise couldn't get a score
            return None;
        };

        // If the candidate is our preferred node, then it is better than the current location, as long
        // as it is online -- the online check is part of the score calculation we did above, so it's
        // important that this check comes after that one.
        if let Some(preferred) = self.preferred_node.as_ref() {
            if preferred == &candidate {
                return Some(true);
            }
        }

        match scheduler.compute_node_score::<T::Score>(
            current,
            &self.intent.preferred_az_id,
            schedule_context,
        ) {
            Some(current_score) => {
                // Ignore utilization components when comparing scores: we don't want to migrate
                // because of transient load variations, it risks making the system thrash, and
                // migrating for utilization requires a separate high level view of the system to
                // e.g. prioritize moving larger or smaller tenants, rather than arbitrarily
                // moving things around in the order that we hit this function.
                let candidate_score = candidate_score.for_optimization();
                let current_score = current_score.for_optimization();

                if candidate_score < current_score {
                    tracing::info!(
                        "Found a lower scoring location! {candidate} is better than {current} ({candidate_score:?} is better than {current_score:?})"
                    );
                    Some(true)
                } else {
                    // The candidate node is no better than our current location, so don't migrate
                    tracing::debug!(
                        "Candidate node {candidate} is no better than our current location {current} (candidate {candidate_score:?} vs current {current_score:?})",
                    );
                    Some(false)
                }
            }
            None => {
                // The current node is unavailable for scheduling, so we can't make any sensible
                // decisions about optimisation.  This should be a transient state -- if the node
                // is offline then it will get evacuated, if is blocked by a scheduling mode
                // then we will respect that mode by doing nothing.
                tracing::debug!("Current node {current} is unavailable for scheduling");
                None
            }
        }
    }

    pub(crate) fn find_better_location<T: ShardTag>(
        &self,
        scheduler: &mut Scheduler,
        schedule_context: &ScheduleContext,
        current: NodeId,
        hard_exclude: &[NodeId],
    ) -> Option<NodeId> {
        // If we have a migration hint, then that is our better location
        if let Some(hint) = self.preferred_node.as_ref() {
            if hint == &current {
                return None;
            }

            return Some(*hint);
        }

        // Look for a lower-scoring location to attach to
        let Ok(candidate_node) = scheduler.schedule_shard::<T>(
            hard_exclude,
            &self.intent.preferred_az_id,
            schedule_context,
        ) else {
            // A scheduling error means we have no possible candidate replacements
            tracing::debug!("No candidate node found");
            return None;
        };

        if candidate_node == current {
            // We're already at the best possible location, so don't migrate
            tracing::debug!("Candidate node {candidate_node} is already in use");
            return None;
        }

        self.is_better_location::<T>(scheduler, schedule_context, current, candidate_node)
            .and_then(|better| if better { Some(candidate_node) } else { None })
    }

    /// This function is an optimization, used to avoid doing large numbers of scheduling operations
    /// when looking for optimizations.  This function uses knowledge of how scores work to do some
    /// fast checks for whether it may to be possible to improve a score.
    ///
    /// If we return true, it only means that optimization _might_ be possible, not that it necessarily is.  If we
    /// return no, it definitely means that calling [`Self::optimize_attachment`] or [`Self::optimize_secondary`] would do no
    /// work.
    pub(crate) fn maybe_optimizable(
        &self,
        scheduler: &mut Scheduler,
        schedule_context: &ScheduleContext,
    ) -> bool {
        // Tenant with preferred node: check if it is not already at the preferred node
        if let Some(preferred) = self.preferred_node.as_ref() {
            if Some(preferred) != self.intent.get_attached().as_ref() {
                return true;
            }
        }

        // Sharded tenant: check if any locations have a nonzero affinity score
        if self.shard.count >= ShardCount(1) {
            let schedule_context = schedule_context.project_detach(self);
            for node in self.intent.all_pageservers() {
                if let Some(af) = schedule_context.nodes.get(&node) {
                    if *af > AffinityScore(0) {
                        return true;
                    }
                }
            }
        }

        // Attached tenant: check if the attachment is outside the preferred AZ
        if let PlacementPolicy::Attached(_) = self.policy {
            if let Some(attached) = self.intent.get_attached() {
                if scheduler.get_node_az(attached) != self.intent.preferred_az_id {
                    return true;
                }
            }
        }

        // Tenant with secondary locations: check if any are within the preferred AZ
        for secondary in self.intent.get_secondary() {
            if scheduler.get_node_az(secondary) == self.intent.preferred_az_id {
                return true;
            }
        }

        // Does the tenant have excess secondaries?
        if self.intent.get_secondary().len() > self.policy.want_secondaries() {
            return true;
        }

        // Fall through: no optimizations possible
        false
    }

    /// Optimize attachments: if a shard has a secondary location that is preferable to
    /// its primary location based on soft constraints, switch that secondary location
    /// to be attached.
    ///
    /// `schedule_context` should have been populated with all shards in the tenant, including
    /// the one we're trying to optimize (this function will subtract its own contribution before making scoring decisions)
    #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    pub(crate) fn optimize_attachment(
        &self,
        scheduler: &mut Scheduler,
        schedule_context: &ScheduleContext,
    ) -> Option<ScheduleOptimization> {
        let attached = (*self.intent.get_attached())?;

        let schedule_context = schedule_context.project_detach(self);

        // If we already have a secondary that is higher-scoring than out current location,
        // then simply migrate to it.
        for secondary in self.intent.get_secondary() {
            if let Some(true) = self.is_better_location::<AttachedShardTag>(
                scheduler,
                &schedule_context,
                attached,
                *secondary,
            ) {
                return Some(ScheduleOptimization {
                    sequence: self.sequence,
                    action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                        old_attached_node_id: attached,
                        new_attached_node_id: *secondary,
                    }),
                });
            }
        }

        // Given that none of our current secondaries is a better location than our current
        // attached location (checked above), we may trim any secondaries that are not needed
        // for the placement policy.
        if self.intent.get_secondary().len() > self.policy.want_secondaries() {
            // This code path cleans up extra secondaries after migrating, and/or
            // trims extra secondaries after a PlacementPolicy::Attached(N) was
            // modified to decrease N.

            let secondary_scores = self
                .intent
                .get_secondary()
                .iter()
                .map(|node_id| {
                    (
                        *node_id,
                        scheduler.compute_node_score::<NodeSecondarySchedulingScore>(
                            *node_id,
                            &self.intent.preferred_az_id,
                            &schedule_context,
                        ),
                    )
                })
                .collect::<HashMap<_, _>>();

            if secondary_scores.iter().any(|score| score.1.is_none()) {
                // Trivial case: if we only have one secondary, drop that one
                if self.intent.get_secondary().len() == 1 {
                    return Some(ScheduleOptimization {
                        sequence: self.sequence,
                        action: ScheduleOptimizationAction::RemoveSecondary(
                            *self.intent.get_secondary().first().unwrap(),
                        ),
                    });
                }

                // Try to find a "good" secondary to keep, without relying on scores (one or more nodes is in a state
                // where its score can't be calculated), and drop the others.  This enables us to make progress in
                // most cases, even if some nodes are offline or have scheduling=pause set.

                debug_assert!(self.intent.attached.is_some()); // We should not make it here unless attached -- this
                // logic presumes we are in a mode where we want secondaries to be in non-home AZ
                if let Some(retain_secondary) = self.intent.get_secondary().iter().find(|n| {
                    let in_home_az = scheduler.get_node_az(n) == self.intent.preferred_az_id;
                    let is_available = secondary_scores
                        .get(n)
                        .expect("Built from same list of nodes")
                        .is_some();
                    is_available && !in_home_az
                }) {
                    // Great, we found one to retain.  Pick some other to drop.
                    if let Some(victim) = self
                        .intent
                        .get_secondary()
                        .iter()
                        .find(|n| n != &retain_secondary)
                    {
                        return Some(ScheduleOptimization {
                            sequence: self.sequence,
                            action: ScheduleOptimizationAction::RemoveSecondary(*victim),
                        });
                    }
                }

                // Fall through: we didn't identify one to remove.  This ought to be rare.
                tracing::warn!(
                    "Keeping extra secondaries: can't determine which of {:?} to remove (some nodes offline?)",
                    self.intent.get_secondary()
                );
            } else {
                let victim = secondary_scores
                    .iter()
                    .max_by_key(|score| score.1.unwrap())
                    .unwrap()
                    .0;
                return Some(ScheduleOptimization {
                    sequence: self.sequence,
                    action: ScheduleOptimizationAction::RemoveSecondary(*victim),
                });
            }
        }

        let replacement = self.find_better_location::<AttachedShardTag>(
            scheduler,
            &schedule_context,
            attached,
            &[], // Don't exclude secondaries: our preferred attachment location may be a secondary
        );

        // We have found a candidate and confirmed that its score is preferable
        // to our current location. See if we have a secondary location in the preferred location already: if not,
        // then create one.
        if let Some(replacement) = replacement {
            // If we are currently in non-preferred AZ, then the scheduler might suggest a location that is better, but still
            // not in our preferred AZ.  Migration has a cost in resources an impact to the workload, so we want to avoid doing
            // multiple hops where we might go to some other AZ before eventually finding a suitable location in our preferred
            // AZ: skip this optimization if it is not in our final, preferred AZ.
            //
            // This should be a transient state, there should always be capacity eventually in our preferred AZ (even if nodes
            // there are too overloaded for scheduler to suggest them, more should be provisioned eventually).
            if self.preferred_node.is_none()
                && self.intent.preferred_az_id.is_some()
                && scheduler.get_node_az(&replacement) != self.intent.preferred_az_id
            {
                tracing::debug!(
                    "Candidate node {replacement} is not in preferred AZ {:?}",
                    self.intent.preferred_az_id
                );

                // This should only happen if our current location is not in the preferred AZ, otherwise
                // [`Self::find_better_location`]` should have rejected any other location outside the preferred Az, because
                // AZ is the highest priority part of NodeAttachmentSchedulingScore.
                debug_assert!(scheduler.get_node_az(&attached) != self.intent.preferred_az_id);

                return None;
            }

            if !self.intent.get_secondary().contains(&replacement) {
                Some(ScheduleOptimization {
                    sequence: self.sequence,
                    action: ScheduleOptimizationAction::CreateSecondary(replacement),
                })
            } else {
                // We already have a secondary in the preferred location, let's try migrating to it.  Our caller
                // will check the warmth of the destination before deciding whether to really execute this.
                Some(ScheduleOptimization {
                    sequence: self.sequence,
                    action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                        old_attached_node_id: attached,
                        new_attached_node_id: replacement,
                    }),
                })
            }
        } else {
            // We didn't find somewhere we'd rather be, and we don't have any excess secondaries
            // to clean up: no action required.
            None
        }
    }

    #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    pub(crate) fn optimize_secondary(
        &self,
        scheduler: &mut Scheduler,
        schedule_context: &ScheduleContext,
    ) -> Option<ScheduleOptimization> {
        if self.intent.get_secondary().len() > self.policy.want_secondaries() {
            // We have extra secondaries, perhaps to facilitate a migration of the attached location:
            // do nothing, it is up to [`Self::optimize_attachment`] to clean them up.  When that's done,
            // and we are called again, we will proceed.
            tracing::debug!("Too many secondaries: skipping");
            return None;
        }

        let schedule_context = schedule_context.project_detach(self);

        for secondary in self.intent.get_secondary() {
            // Make sure we don't try to migrate a secondary to our attached location: this case happens
            // easily in environments without multiple AZs.
            let mut exclude = match self.intent.attached {
                Some(attached) => vec![attached],
                None => vec![],
            };

            // Exclude all other secondaries from the scheduling process to avoid replacing
            // one existing secondary with another existing secondary.
            for another_secondary in self.intent.secondary.iter() {
                if another_secondary != secondary {
                    exclude.push(*another_secondary);
                }
            }

            let replacement = match &self.policy {
                PlacementPolicy::Attached(_) => {
                    // Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
                    // to avoid placing them in the preferred AZ.
                    self.find_better_location::<SecondaryShardTag>(
                        scheduler,
                        &schedule_context,
                        *secondary,
                        &exclude,
                    )
                }
                PlacementPolicy::Secondary => {
                    // In secondary-only mode, we want our secondary locations in the preferred AZ,
                    // so that they're ready to take over as an attached location when we transition
                    // into PlacementPolicy::Attached.
                    self.find_better_location::<AttachedShardTag>(
                        scheduler,
                        &schedule_context,
                        *secondary,
                        &exclude,
                    )
                }
                PlacementPolicy::Detached => None,
            };

            assert!(replacement != Some(*secondary));
            if let Some(replacement) = replacement {
                // We have found a candidate and confirmed that its score is preferable
                // to our current location. See if we have a secondary location in the preferred location already: if not,
                // then create one.
                return Some(ScheduleOptimization {
                    sequence: self.sequence,
                    action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
                        old_node_id: *secondary,
                        new_node_id: replacement,
                    }),
                });
            }
        }

        None
    }

    /// Start or abort a graceful migration of this shard to another pageserver. This works on top of the
    /// other optimisation functions, to bias them to move to the destination node.
    pub(crate) fn set_preferred_node(&mut self, node: Option<NodeId>) {
        if let Some(hint) = self.preferred_node.as_ref() {
            if Some(hint) != node.as_ref() {
                // This is legal but a bit surprising: we expect that administrators wouldn't usually
                // change their mind about where to migrate something.
                tracing::warn!(
                    "Changing migration destination from {hint} to {node:?} (current intent {:?})",
                    self.intent
                );
            }
        }

        self.preferred_node = node;
    }

    pub(crate) fn get_preferred_node(&self) -> Option<NodeId> {
        self.preferred_node
    }

    /// Return true if the optimization was really applied: it will not be applied if the optimization's
    /// sequence is behind this tenant shard's or if the intent state proposed by the optimization
    /// is not compatible with the current intent state. The later may happen when the background
    /// reconcile loops runs concurrently with HTTP driven optimisations.
    pub(crate) fn apply_optimization(
        &mut self,
        scheduler: &mut Scheduler,
        optimization: ScheduleOptimization,
    ) -> bool {
        if optimization.sequence != self.sequence {
            return false;
        }

        if !self.validate_optimization(&optimization) {
            tracing::info!(
                "Skipping optimization for {} because it does not match current intent: {:?}",
                self.tenant_shard_id,
                optimization,
            );
            return false;
        }

        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_schedule_optimization
            .inc();

        match optimization.action {
            ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                old_attached_node_id,
                new_attached_node_id,
            }) => {
                self.intent.demote_attached(scheduler, old_attached_node_id);
                self.intent
                    .promote_attached(scheduler, new_attached_node_id);

                if let Some(hint) = self.preferred_node.as_ref() {
                    if hint == &new_attached_node_id {
                        // The migration target is not a long term pin: once we are done with the migration, clear it.
                        tracing::info!("Graceful migration to {hint} complete");
                        self.preferred_node = None;
                    }
                }
            }
            ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
                old_node_id,
                new_node_id,
            }) => {
                self.intent.remove_secondary(scheduler, old_node_id);
                self.intent.push_secondary(scheduler, new_node_id);
            }
            ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
                self.intent.push_secondary(scheduler, new_node_id);
            }
            ScheduleOptimizationAction::RemoveSecondary(old_secondary) => {
                self.intent.remove_secondary(scheduler, old_secondary);
            }
        }

        true
    }

    /// Check that the desired modifications to the intent state are compatible with the current
    /// intent state. Note that the lock is not held between planning optimizations and applying
    /// them so any valid state transition of the intent state may have occurred.
    fn validate_optimization(&self, optimization: &ScheduleOptimization) -> bool {
        match optimization.action {
            ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                old_attached_node_id,
                new_attached_node_id,
            }) => {
                self.intent.attached == Some(old_attached_node_id)
                    && self.intent.secondary.contains(&new_attached_node_id)
            }
            ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
                old_node_id: _,
                new_node_id,
            }) => {
                // It's legal to remove a secondary that is not present in the intent state
                !self.intent.secondary.contains(&new_node_id)
                    // Ensure the secondary hasn't already been promoted to attached by a concurrent
                    // optimization/migration.
                    && self.intent.attached != Some(new_node_id)
            }
            ScheduleOptimizationAction::CreateSecondary(new_node_id) => {
                !self.intent.secondary.contains(&new_node_id)
            }
            ScheduleOptimizationAction::RemoveSecondary(_) => {
                // It's legal to remove a secondary that is not present in the intent state
                true
            }
        }
    }

    /// When a shard has several secondary locations, we need to pick one in situations where
    /// we promote one of them to an attached location:
    ///  - When draining a node for restart
    ///  - When responding to a node failure
    ///
    /// In this context, 'preferred' does not mean the node with the best scheduling score: instead
    /// we want to pick the node which is best for use _temporarily_ while the previous attached location
    /// is unavailable (e.g. because it's down or deploying).  That means we prefer to use secondary
    /// locations in a non-preferred AZ, as they're more likely to have awarm cache than a temporary
    /// secondary in the preferred AZ (which are usually only created for migrations, and if they exist
    /// they're probably not warmed up yet). The latter behavior is based oni
    ///
    /// If the input is empty, or all the nodes are not elegible for scheduling, return None: the
    /// caller needs to a pick a node some other way.
    pub(crate) fn preferred_secondary(&self, scheduler: &Scheduler) -> Option<NodeId> {
        let candidates = scheduler.filter_usable_nodes(&self.intent.secondary);

        // We will sort candidates to prefer nodes which are _not_ in our preferred AZ, i.e. we prefer
        // to migrate to a long-lived secondary location (which would have been scheduled in a non-preferred AZ),
        // rather than a short-lived secondary location being used for optimization/migration (which would have
        // been scheduled in our preferred AZ).
        let mut candidates = candidates
            .iter()
            .map(|(node_id, node_az)| {
                if node_az == &self.intent.preferred_az_id {
                    (1, *node_id)
                } else {
                    (0, *node_id)
                }
            })
            .collect::<Vec<_>>();

        candidates.sort();

        candidates.first().map(|i| i.1)
    }

    /// Query whether the tenant's observed state for attached node matches its intent state, and if so,
    /// yield the node ID.  This is appropriate for emitting compute hook notifications: we are checking that
    /// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
    ///
    /// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
    /// funciton should not be used to decide whether to reconcile.
    pub(crate) fn stably_attached(&self) -> Option<NodeId> {
        // We have an intent to attach for this node
        let attach_intent = self.intent.attached?;
        // We have an observed state for this node
        let location = self.observed.locations.get(&attach_intent)?;
        // Our observed state is not None, i.e. not in flux
        let location_config = location.conf.as_ref()?;

        // Check if our intent and observed state agree that this node is in an attached state.
        match location_config.mode {
            LocationConfigMode::AttachedMulti
            | LocationConfigMode::AttachedSingle
            | LocationConfigMode::AttachedStale => Some(attach_intent),
            _ => None,
        }
    }

    fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
        let mut dirty_nodes = HashSet::new();

        if let Some(node_id) = self.intent.attached {
            // Maybe panic: it is a severe bug if we try to attach while generation is null.
            let generation = self
                .generation
                .expect("Attempted to enter attached state without a generation");

            let wanted_conf = attached_location_conf(
                generation,
                &self.shard,
                &self.config,
                &self.policy,
                self.intent.get_secondary().len(),
            );
            match self.observed.locations.get(&node_id) {
                Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
                Some(_) | None => {
                    dirty_nodes.insert(node_id);
                }
            }
        }

        for node_id in &self.intent.secondary {
            let wanted_conf = secondary_location_conf(&self.shard, &self.config);
            match self.observed.locations.get(node_id) {
                Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
                Some(_) | None => {
                    dirty_nodes.insert(*node_id);
                }
            }
        }

        for node_id in self.observed.locations.keys() {
            if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
                // We have observed state that isn't part of our intent: need to clean it up.
                dirty_nodes.insert(*node_id);
            }
        }

        dirty_nodes.retain(|node_id| {
            nodes
                .get(node_id)
                .map(|n| n.is_available())
                .unwrap_or(false)
        });

        !dirty_nodes.is_empty()
    }

    #[allow(clippy::too_many_arguments)]
    #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    pub(crate) fn get_reconcile_needed(
        &mut self,
        pageservers: &Arc<HashMap<NodeId, Node>>,
    ) -> ReconcileNeeded {
        // If there are any ambiguous observed states, and the nodes they refer to are available,
        // we should reconcile to clean them up.
        let mut dirty_observed = false;
        for (node_id, observed_loc) in &self.observed.locations {
            let node = pageservers
                .get(node_id)
                .expect("Nodes may not be removed while referenced");
            if observed_loc.conf.is_none() && node.is_available() {
                dirty_observed = true;
                break;
            }
        }

        let active_nodes_dirty = self.dirty(pageservers);

        let reconcile_needed = match (
            active_nodes_dirty,
            dirty_observed,
            self.pending_compute_notification,
        ) {
            (true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
            (_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
            (_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
            _ => ReconcileNeeded::No,
        };

        if matches!(reconcile_needed, ReconcileNeeded::No) {
            tracing::debug!("Not dirty, no reconciliation needed.");
            return ReconcileNeeded::No;
        }

        // If we are currently splitting, then never start a reconciler task: the splitting logic
        // requires that shards are not interfered with while it runs. Do this check here rather than
        // up top, so that we only log this message if we would otherwise have done a reconciliation.
        if !matches!(self.splitting, SplitState::Idle) {
            tracing::info!("Refusing to reconcile, splitting in progress");
            return ReconcileNeeded::No;
        }

        // Reconcile already in flight for the current sequence?
        if let Some(handle) = &self.reconciler {
            if handle.sequence == self.sequence {
                tracing::info!(
                    "Reconciliation already in progress for sequence {:?}",
                    self.sequence,
                );
                return ReconcileNeeded::WaitExisting(ReconcilerWaiter {
                    tenant_shard_id: self.tenant_shard_id,
                    seq_wait: self.waiter.clone(),
                    error_seq_wait: self.error_waiter.clone(),
                    error: self.last_error.clone(),
                    seq: self.sequence,
                });
            }
        }

        // Pre-checks done: finally check whether we may actually do the work
        match self.scheduling_policy {
            ShardSchedulingPolicy::Active
            | ShardSchedulingPolicy::Essential
            | ShardSchedulingPolicy::Pause => {}
            ShardSchedulingPolicy::Stop => {
                // We only reach this point if there is work to do and we're going to skip
                // doing it: warn it obvious why this tenant isn't doing what it ought to.
                tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
                return ReconcileNeeded::No;
            }
        }

        reconcile_needed
    }

    /// Ensure the sequence number is set to a value where waiting for this value will make us wait
    /// for the next reconcile: i.e. it is ahead of all completed or running reconcilers.
    ///
    /// Constructing a ReconcilerWaiter with the resulting sequence number gives the property
    /// that the waiter will not complete until some future Reconciler is constructed and run.
    fn ensure_sequence_ahead(&mut self) {
        // Find the highest sequence for which a Reconciler has previously run or is currently
        // running
        let max_seen = std::cmp::max(
            self.reconciler
                .as_ref()
                .map(|r| r.sequence)
                .unwrap_or(Sequence(0)),
            std::cmp::max(self.waiter.load(), self.error_waiter.load()),
        );

        if self.sequence <= max_seen {
            self.sequence = max_seen.next();
        }
    }

    /// Create a waiter that will wait for some future Reconciler that hasn't been spawned yet.
    ///
    /// This is appropriate when you can't spawn a reconciler (e.g. due to resource limits), but
    /// you would like to wait on the next reconciler that gets spawned in the background.
    pub(crate) fn future_reconcile_waiter(&mut self) -> ReconcilerWaiter {
        self.ensure_sequence_ahead();

        ReconcilerWaiter {
            tenant_shard_id: self.tenant_shard_id,
            seq_wait: self.waiter.clone(),
            error_seq_wait: self.error_waiter.clone(),
            error: self.last_error.clone(),
            seq: self.sequence,
        }
    }

    async fn reconcile(
        sequence: Sequence,
        mut reconciler: Reconciler,
        must_notify: bool,
    ) -> ReconcileResult {
        // Attempt to make observed state match intent state
        let result = reconciler.reconcile().await;

        // If we know we had a pending compute notification from some previous action, send a notification irrespective
        // of whether the above reconcile() did any work.  It has to be Ok() though, because otherwise we might be
        // sending a notification of a location that isn't really attached.
        if result.is_ok() && must_notify {
            // If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
            reconciler.compute_notify().await.ok();
        } else if must_notify {
            // Carry this flag so that the reconciler's result will indicate that it still needs to retry
            // the compute hook notification eventually.
            reconciler.compute_notify_failure = true;
        }

        // Update result counter
        let outcome_label = match &result {
            Ok(_) => {
                if reconciler.compute_notify_failure {
                    ReconcileOutcome::SuccessNoNotify
                } else {
                    ReconcileOutcome::Success
                }
            }
            Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
            Err(_) => ReconcileOutcome::Error,
        };

        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_reconcile_complete
            .inc(ReconcileCompleteLabelGroup {
                status: outcome_label,
            });

        // Constructing result implicitly drops Reconciler, freeing any ReconcileUnits before the Service might
        // try and schedule more work in response to our result.
        ReconcileResult {
            sequence,
            result,
            tenant_shard_id: reconciler.tenant_shard_id,
            generation: reconciler.generation,
            observed_deltas: reconciler.observed_deltas(),
            pending_compute_notification: reconciler.compute_notify_failure,
        }
    }

    #[allow(clippy::too_many_arguments)]
    #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
    pub(crate) fn spawn_reconciler(
        &mut self,
        reason: ReconcileReason,
        result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
        pageservers: &Arc<HashMap<NodeId, Node>>,
        compute_hook: &Arc<ComputeHook>,
        reconciler_config: ReconcilerConfig,
        service_config: &service::Config,
        persistence: &Arc<Persistence>,
        units: ReconcileUnits,
        gate_guard: GateGuard,
        cancel: &CancellationToken,
        http_client: reqwest::Client,
    ) -> Option<ReconcilerWaiter> {
        // Reconcile in flight for a stale sequence?  Our sequence's task will wait for it before
        // doing our sequence's work.
        let old_handle = self.reconciler.take();

        // Build list of nodes from which the reconciler should detach
        let mut detach = Vec::new();
        for node_id in self.observed.locations.keys() {
            if self.intent.get_attached() != &Some(*node_id)
                && !self.intent.secondary.contains(node_id)
            {
                detach.push(
                    pageservers
                        .get(node_id)
                        .expect("Intent references non-existent pageserver")
                        .clone(),
                )
            }
        }

        // Advance the sequence before spawning a reconciler, so that sequence waiters
        // can distinguish between before+after the reconcile completes.
        self.ensure_sequence_ahead();

        let reconciler_cancel = cancel.child_token();
        let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
        let reconciler = Reconciler {
            tenant_shard_id: self.tenant_shard_id,
            shard: self.shard,
            placement_policy: self.policy.clone(),
            generation: self.generation,
            intent: reconciler_intent,
            detach,
            reconciler_config,
            config: self.config.clone(),
            preferred_az: self.intent.preferred_az_id.clone(),
            observed: self.observed.clone(),
            original_observed: self.observed.clone(),
            compute_hook: compute_hook.clone(),
            service_config: service_config.clone(),
            _gate_guard: gate_guard,
            _resource_units: units,
            cancel: reconciler_cancel.clone(),
            persistence: persistence.clone(),
            compute_notify_failure: false,
            http_client,
        };

        let reconcile_seq = self.sequence;
        let long_reconcile_threshold = service_config.long_reconcile_threshold;

        tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
        let must_notify = self.pending_compute_notification;
        let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
                                                        tenant_id=%reconciler.tenant_shard_id.tenant_id,
                                                        shard_id=%reconciler.tenant_shard_id.shard_slug());
        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_reconcile_spawn
            .inc();
        let result_tx = result_tx.clone();
        let join_handle = tokio::task::spawn(
            async move {
                // Wait for any previous reconcile task to complete before we start
                if let Some(old_handle) = old_handle {
                    old_handle.cancel.cancel();
                    if let Err(e) = old_handle.handle.await {
                        // We can't do much with this other than log it: the task is done, so
                        // we may proceed with our work.
                        tracing::error!("Unexpected join error waiting for reconcile task: {e}");
                    }
                }

                // Early check for cancellation before doing any work
                // TODO: wrap all remote API operations in cancellation check
                // as well.
                if reconciler.cancel.is_cancelled() {
                    metrics::METRICS_REGISTRY
                        .metrics_group
                        .storage_controller_reconcile_complete
                        .inc(ReconcileCompleteLabelGroup {
                            status: ReconcileOutcome::Cancel,
                        });
                    return;
                }

                let (tenant_id_label, shard_number_label, sequence_label) = {
                    (
                        reconciler.tenant_shard_id.tenant_id.to_string(),
                        reconciler.tenant_shard_id.shard_number.0.to_string(),
                        reconcile_seq.to_string(),
                    )
                };

                let label_group = ReconcileLongRunningLabelGroup {
                    tenant_id: &tenant_id_label,
                    shard_number: &shard_number_label,
                    sequence: &sequence_label,
                };

                let reconcile_fut = Self::reconcile(reconcile_seq, reconciler, must_notify);
                let long_reconcile_fut = {
                    let label_group = label_group.clone();
                    async move {
                        tokio::time::sleep(long_reconcile_threshold).await;

                        tracing::warn!("Reconcile passed the long running threshold of {long_reconcile_threshold:?}");

                        metrics::METRICS_REGISTRY
                            .metrics_group
                            .storage_controller_reconcile_long_running
                            .inc(label_group);
                    }
                };

                let reconcile_fut = std::pin::pin!(reconcile_fut);
                let long_reconcile_fut = std::pin::pin!(long_reconcile_fut);

                let (was_long, result) =
                    match future::select(reconcile_fut, long_reconcile_fut).await {
                        Either::Left((reconcile_result, _)) => (false, reconcile_result),
                        Either::Right((_, reconcile_fut)) => (true, reconcile_fut.await),
                    };

                if was_long {
                    let id = metrics::METRICS_REGISTRY
                        .metrics_group
                        .storage_controller_reconcile_long_running
                        .with_labels(label_group);
                    metrics::METRICS_REGISTRY
                        .metrics_group
                        .storage_controller_reconcile_long_running
                        .remove_metric(id);
                }

                result_tx
                    .send(ReconcileResultRequest::ReconcileResult(result))
                    .ok();
            }
            .instrument(reconciler_span),
        );

        self.reconciler = Some(ReconcilerHandle {
            sequence: self.sequence,
            handle: join_handle,
            cancel: reconciler_cancel,
        });

        Some(ReconcilerWaiter {
            tenant_shard_id: self.tenant_shard_id,
            seq_wait: self.waiter.clone(),
            error_seq_wait: self.error_waiter.clone(),
            error: self.last_error.clone(),
            seq: self.sequence,
        })
    }

    pub(crate) fn cancel_reconciler(&self) {
        if let Some(handle) = self.reconciler.as_ref() {
            handle.cancel.cancel()
        }
    }

    /// Get a waiter for any reconciliation in flight, but do not start reconciliation
    /// if it is not already running
    pub(crate) fn get_waiter(&self) -> Option<ReconcilerWaiter> {
        if self.reconciler.is_some() {
            Some(ReconcilerWaiter {
                tenant_shard_id: self.tenant_shard_id,
                seq_wait: self.waiter.clone(),
                error_seq_wait: self.error_waiter.clone(),
                error: self.last_error.clone(),
                seq: self.sequence,
            })
        } else {
            None
        }
    }

    /// Called when a ReconcileResult has been emitted and the service is updating
    /// our state: if the result is from a sequence >= my ReconcileHandle, then drop
    /// the handle to indicate there is no longer a reconciliation in progress.
    pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
        if let Some(reconcile_handle) = &self.reconciler {
            if reconcile_handle.sequence <= sequence {
                self.reconciler = None;
            }
        }
    }

    /// If we had any state at all referring to this node ID, drop it.  Does not
    /// attempt to reschedule.
    ///
    /// Returns true if we modified the node's intent state.
    pub(crate) fn deref_node(&mut self, node_id: NodeId) -> bool {
        let mut intent_modified = false;

        // Drop if this node was our attached intent
        if self.intent.attached == Some(node_id) {
            self.intent.attached = None;
            intent_modified = true;
        }

        // Drop from the list of secondaries, and check if we modified it
        let had_secondaries = self.intent.secondary.len();
        self.intent.secondary.retain(|n| n != &node_id);
        intent_modified |= self.intent.secondary.len() != had_secondaries;

        debug_assert!(!self.intent.all_pageservers().contains(&node_id));

        if self.preferred_node == Some(node_id) {
            self.preferred_node = None;
        }

        intent_modified
    }

    pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
        self.scheduling_policy = p;
    }

    pub(crate) fn get_scheduling_policy(&self) -> ShardSchedulingPolicy {
        self.scheduling_policy
    }

    pub(crate) fn set_last_error(&mut self, sequence: Sequence, error: ReconcileError) {
        // Ordering: always set last_error before advancing sequence, so that sequence
        // waiters are guaranteed to see a Some value when they see an error.
        *(self.last_error.lock().unwrap()) = Some(Arc::new(error));
        self.error_waiter.advance(sequence);
    }

    pub(crate) fn from_persistent(
        tsp: TenantShardPersistence,
        intent: IntentState,
    ) -> anyhow::Result<Self> {
        let tenant_shard_id = tsp.get_tenant_shard_id()?;
        let shard_identity = tsp.get_shard_identity()?;

        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_tenant_shards
            .inc();

        Ok(Self {
            tenant_shard_id,
            shard: shard_identity,
            sequence: Sequence::initial(),
            generation: tsp.generation.map(|g| Generation::new(g as u32)),
            policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
            intent,
            observed: ObservedState::new(),
            config: serde_json::from_str(&tsp.config).unwrap(),
            reconciler: None,
            splitting: tsp.splitting,
            // Filled in during [`Service::startup_reconcile`]
            importing: TimelineImportState::Idle,
            waiter: Arc::new(SeqWait::new(Sequence::initial())),
            error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
            last_error: Arc::default(),
            consecutive_reconciles_count: 0,
            pending_compute_notification: false,
            delayed_reconcile: false,
            scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
            preferred_node: None,
        })
    }

    pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
        TenantShardPersistence {
            tenant_id: self.tenant_shard_id.tenant_id.to_string(),
            shard_number: self.tenant_shard_id.shard_number.0 as i32,
            shard_count: self.tenant_shard_id.shard_count.literal() as i32,
            shard_stripe_size: self.shard.stripe_size.0 as i32,
            generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
            generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
            placement_policy: serde_json::to_string(&self.policy).unwrap(),
            config: serde_json::to_string(&self.config).unwrap(),
            splitting: SplitState::default(),
            scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
            preferred_az_id: self.intent.preferred_az_id.as_ref().map(|az| az.0.clone()),
        }
    }

    pub(crate) fn preferred_az(&self) -> Option<&AvailabilityZone> {
        self.intent.get_preferred_az()
    }

    pub(crate) fn set_preferred_az(
        &mut self,
        scheduler: &mut Scheduler,
        preferred_az_id: Option<AvailabilityZone>,
    ) {
        self.intent.set_preferred_az(scheduler, preferred_az_id);
    }

    /// Returns all the nodes to which this tenant shard is attached according to the
    /// observed state and the generations. Return vector is sorted from latest generation
    /// to earliest.
    pub(crate) fn attached_locations(&self) -> Vec<(NodeId, Generation)> {
        self.observed
            .locations
            .iter()
            .filter_map(|(node_id, observed)| {
                use LocationConfigMode::{AttachedMulti, AttachedSingle, AttachedStale};

                let conf = observed.conf.as_ref()?;

                match (conf.generation, conf.mode) {
                    (Some(gen_), AttachedMulti | AttachedSingle | AttachedStale) => {
                        Some((*node_id, gen_))
                    }
                    _ => None,
                }
            })
            .sorted_by(|(_lhs_node_id, lhs_gen), (_rhs_node_id, rhs_gen)| {
                lhs_gen.cmp(rhs_gen).reverse()
            })
            .map(|(node_id, gen_)| (node_id, Generation::new(gen_)))
            .collect()
    }

    /// Update the observed state of the tenant by applying incremental deltas
    ///
    /// Deltas are generated by reconcilers via [`Reconciler::observed_deltas`].
    /// They are then filtered in [`crate::service::Service::process_result`].
    pub(crate) fn apply_observed_deltas(
        &mut self,
        deltas: impl Iterator<Item = ObservedStateDelta>,
    ) {
        for delta in deltas {
            match delta {
                ObservedStateDelta::Upsert(ups) => {
                    let (node_id, loc) = *ups;

                    // If the generation of the observed location in the delta is lagging
                    // behind the current one, then we have a race condition and cannot
                    // be certain about the true observed state. Set the observed state
                    // to None in order to reflect this.
                    let crnt_gen = self
                        .observed
                        .locations
                        .get(&node_id)
                        .and_then(|loc| loc.conf.as_ref())
                        .and_then(|conf| conf.generation);
                    let new_gen = loc.conf.as_ref().and_then(|conf| conf.generation);
                    match (crnt_gen, new_gen) {
                        (Some(crnt), Some(new)) if crnt_gen > new_gen => {
                            tracing::warn!(
                                "Skipping observed state update {}: {:?} and using None due to stale generation ({} > {})",
                                node_id,
                                loc,
                                crnt,
                                new
                            );

                            self.observed
                                .locations
                                .insert(node_id, ObservedStateLocation { conf: None });

                            continue;
                        }
                        _ => {}
                    }

                    if let Some(conf) = &loc.conf {
                        tracing::info!("Updating observed location {}: {:?}", node_id, conf);
                    } else {
                        tracing::info!("Setting observed location {} to None", node_id,)
                    }

                    self.observed.locations.insert(node_id, loc);
                }
                ObservedStateDelta::Delete(node_id) => {
                    tracing::info!("Deleting observed location {}", node_id);
                    self.observed.locations.remove(&node_id);
                }
            }
        }
    }

    /// Returns true if the tenant shard is attached to a node that is outside the preferred AZ.
    ///
    /// If the shard does not have a preferred AZ, returns false.
    pub(crate) fn is_attached_outside_preferred_az(&self, nodes: &HashMap<NodeId, Node>) -> bool {
        self.intent
            .get_attached()
            .map(|node_id| {
                Some(
                    nodes
                        .get(&node_id)
                        .expect("referenced node exists")
                        .get_availability_zone_id(),
                ) != self.intent.preferred_az_id.as_ref()
            })
            .unwrap_or(false)
    }
}

impl Drop for TenantShard {
    fn drop(&mut self) {
        metrics::METRICS_REGISTRY
            .metrics_group
            .storage_controller_tenant_shards
            .dec();
    }
}

#[cfg(test)]
pub(crate) mod tests {
    use std::cell::RefCell;
    use std::rc::Rc;

    use pageserver_api::controller_api::NodeAvailability;
    use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
    use rand::SeedableRng;
    use rand::rngs::StdRng;
    use utils::id::TenantId;

    use super::*;
    use crate::scheduler::test_utils::make_test_nodes;

    fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
        let tenant_id = TenantId::generate();
        let shard_number = ShardNumber(0);
        let shard_count = ShardCount::new(1);
        let stripe_size = DEFAULT_STRIPE_SIZE;

        let tenant_shard_id = TenantShardId {
            tenant_id,
            shard_number,
            shard_count,
        };
        TenantShard::new(
            tenant_shard_id,
            ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
            policy,
            None,
        )
    }

    pub(crate) fn make_test_tenant(
        policy: PlacementPolicy,
        shard_count: ShardCount,
        preferred_az: Option<AvailabilityZone>,
    ) -> Vec<TenantShard> {
        make_test_tenant_with_id(TenantId::generate(), policy, shard_count, preferred_az)
    }

    pub(crate) fn make_test_tenant_with_id(
        tenant_id: TenantId,
        policy: PlacementPolicy,
        shard_count: ShardCount,
        preferred_az: Option<AvailabilityZone>,
    ) -> Vec<TenantShard> {
        let stripe_size = DEFAULT_STRIPE_SIZE;
        (0..shard_count.count())
            .map(|i| {
                let shard_number = ShardNumber(i);

                let tenant_shard_id = TenantShardId {
                    tenant_id,
                    shard_number,
                    shard_count,
                };
                TenantShard::new(
                    tenant_shard_id,
                    ShardIdentity::new(shard_number, shard_count, stripe_size).unwrap(),
                    policy.clone(),
                    preferred_az.clone(),
                )
            })
            .collect()
    }

    /// Test the scheduling behaviors used when a tenant configured for HA is subject
    /// to nodes being marked offline.
    #[test]
    fn tenant_ha_scheduling() -> anyhow::Result<()> {
        // Start with three nodes.  Our tenant will only use two.  The third one is
        // expected to remain unused.
        let mut nodes = make_test_nodes(3, &[]);

        let mut scheduler = Scheduler::new(nodes.values());
        let mut context = ScheduleContext::default();

        let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
        tenant_shard
            .schedule(&mut scheduler, &mut context)
            .expect("we have enough nodes, scheduling should work");

        // Expect to initially be schedule on to different nodes
        assert_eq!(tenant_shard.intent.secondary.len(), 1);
        assert!(tenant_shard.intent.attached.is_some());

        let attached_node_id = tenant_shard.intent.attached.unwrap();
        let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
        assert_ne!(attached_node_id, secondary_node_id);

        // Notifying the attached node is offline should demote it to a secondary
        let changed = tenant_shard
            .intent
            .demote_attached(&mut scheduler, attached_node_id);
        assert!(changed);
        assert!(tenant_shard.intent.attached.is_none());
        assert_eq!(tenant_shard.intent.secondary.len(), 2);

        // Update the scheduler state to indicate the node is offline
        nodes
            .get_mut(&attached_node_id)
            .unwrap()
            .set_availability(NodeAvailability::Offline);
        scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());

        // Scheduling the node should promote the still-available secondary node to attached
        tenant_shard
            .schedule(&mut scheduler, &mut context)
            .expect("active nodes are available");
        assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);

        // The original attached node should have been retained as a secondary
        assert_eq!(
            *tenant_shard.intent.secondary.iter().last().unwrap(),
            attached_node_id
        );

        tenant_shard.intent.clear(&mut scheduler);

        Ok(())
    }

    #[test]
    fn intent_from_observed() -> anyhow::Result<()> {
        let nodes = make_test_nodes(3, &[]);
        let mut scheduler = Scheduler::new(nodes.values());

        let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));

        tenant_shard.observed.locations.insert(
            NodeId(3),
            ObservedStateLocation {
                conf: Some(LocationConfig {
                    mode: LocationConfigMode::AttachedMulti,
                    generation: Some(2),
                    secondary_conf: None,
                    shard_number: tenant_shard.shard.number.0,
                    shard_count: tenant_shard.shard.count.literal(),
                    shard_stripe_size: tenant_shard.shard.stripe_size.0,
                    tenant_conf: TenantConfig::default(),
                }),
            },
        );

        tenant_shard.observed.locations.insert(
            NodeId(2),
            ObservedStateLocation {
                conf: Some(LocationConfig {
                    mode: LocationConfigMode::AttachedStale,
                    generation: Some(1),
                    secondary_conf: None,
                    shard_number: tenant_shard.shard.number.0,
                    shard_count: tenant_shard.shard.count.literal(),
                    shard_stripe_size: tenant_shard.shard.stripe_size.0,
                    tenant_conf: TenantConfig::default(),
                }),
            },
        );

        tenant_shard.intent_from_observed(&mut scheduler);

        // The highest generationed attached location gets used as attached
        assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
        // Other locations get used as secondary
        assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);

        scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;

        tenant_shard.intent.clear(&mut scheduler);
        Ok(())
    }

    #[test]
    fn scheduling_mode() -> anyhow::Result<()> {
        let nodes = make_test_nodes(3, &[]);
        let mut scheduler = Scheduler::new(nodes.values());

        let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));

        // In pause mode, schedule() shouldn't do anything
        tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
        assert!(
            tenant_shard
                .schedule(&mut scheduler, &mut ScheduleContext::default())
                .is_ok()
        );
        assert!(tenant_shard.intent.all_pageservers().is_empty());

        // In active mode, schedule() works
        tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
        assert!(
            tenant_shard
                .schedule(&mut scheduler, &mut ScheduleContext::default())
                .is_ok()
        );
        assert!(!tenant_shard.intent.all_pageservers().is_empty());

        tenant_shard.intent.clear(&mut scheduler);
        Ok(())
    }

    #[test]
    /// Simple case: moving attachment to somewhere better where we already have a secondary
    fn optimize_attachment_simple() -> anyhow::Result<()> {
        let nodes = make_test_nodes(
            3,
            &[
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-c".to_string()),
            ],
        );
        let mut scheduler = Scheduler::new(nodes.values());

        let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
        let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));

        // Initially: both nodes attached on shard 1, and both have secondary locations
        // on different nodes.
        shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
        shard_a.intent.push_secondary(&mut scheduler, NodeId(1));
        shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
        shard_b.intent.push_secondary(&mut scheduler, NodeId(2));

        fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
            let mut schedule_context = ScheduleContext::default();
            schedule_context.avoid(&shard_a.intent.all_pageservers());
            schedule_context.avoid(&shard_b.intent.all_pageservers());
            schedule_context
        }

        let schedule_context = make_schedule_context(&shard_a, &shard_b);
        let optimization_a = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                    old_attached_node_id: NodeId(2),
                    new_attached_node_id: NodeId(1)
                })
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());

        // // Either shard should recognize that it has the option to switch to a secondary location where there
        // // would be no other shards from the same tenant, and request to do so.
        // assert_eq!(
        //     optimization_a_prepare,
        //     Some(ScheduleOptimization {
        //         sequence: shard_a.sequence,
        //         action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
        //     })
        // );
        // shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());

        // let schedule_context = make_schedule_context(&shard_a, &shard_b);
        // let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        // assert_eq!(
        //     optimization_a_migrate,
        //     Some(ScheduleOptimization {
        //         sequence: shard_a.sequence,
        //         action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
        //             old_attached_node_id: NodeId(1),
        //             new_attached_node_id: NodeId(2)
        //         })
        //     })
        // );
        // shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());

        // let schedule_context = make_schedule_context(&shard_a, &shard_b);
        // let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        // assert_eq!(
        //     optimization_a_cleanup,
        //     Some(ScheduleOptimization {
        //         sequence: shard_a.sequence,
        //         action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
        //     })
        // );
        // shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());

        // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
        // let schedule_context = make_schedule_context(&shard_a, &shard_b);
        // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);

        shard_a.intent.clear(&mut scheduler);
        shard_b.intent.clear(&mut scheduler);

        Ok(())
    }

    #[test]
    /// Complicated case: moving attachment to somewhere better where we do not have a secondary
    /// already, creating one as needed.
    fn optimize_attachment_multistep() -> anyhow::Result<()> {
        let nodes = make_test_nodes(
            3,
            &[
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-c".to_string()),
            ],
        );
        let mut scheduler = Scheduler::new(nodes.values());

        // Two shards of a tenant that wants to be in AZ A
        let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));
        let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_b.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));

        // Both shards are initially attached in non-home AZ _and_ have secondaries in non-home AZs
        shard_a.intent.set_attached(&mut scheduler, Some(NodeId(2)));
        shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
        shard_b.intent.set_attached(&mut scheduler, Some(NodeId(3)));
        shard_b.intent.push_secondary(&mut scheduler, NodeId(2));

        fn make_schedule_context(shard_a: &TenantShard, shard_b: &TenantShard) -> ScheduleContext {
            let mut schedule_context = ScheduleContext::default();
            schedule_context.avoid(&shard_a.intent.all_pageservers());
            schedule_context.avoid(&shard_b.intent.all_pageservers());
            schedule_context
        }

        let schedule_context = make_schedule_context(&shard_a, &shard_b);
        let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_prepare,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());

        let schedule_context = make_schedule_context(&shard_a, &shard_b);
        let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_migrate,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                    old_attached_node_id: NodeId(2),
                    new_attached_node_id: NodeId(1)
                })
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());

        let schedule_context = make_schedule_context(&shard_a, &shard_b);
        let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_cleanup,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());

        // // Shard B should not be moved anywhere, since the pressure on node 1 was relieved by moving shard A
        // let schedule_context = make_schedule_context(&shard_a, &shard_b);
        // assert_eq!(shard_b.optimize_attachment(&mut scheduler, &schedule_context), None);

        shard_a.intent.clear(&mut scheduler);
        shard_b.intent.clear(&mut scheduler);

        Ok(())
    }

    #[test]
    /// How the optimisation code handles a shard with a preferred node set; this is an example
    /// of the multi-step migration, but driven by a different input.
    fn optimize_attachment_multi_preferred_node() -> anyhow::Result<()> {
        let nodes = make_test_nodes(
            4,
            &[
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-b".to_string()),
            ],
        );
        let mut scheduler = Scheduler::new(nodes.values());

        // Two shards of a tenant that wants to be in AZ A
        let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_a.intent.preferred_az_id = Some(AvailabilityZone("az-a".to_string()));

        // Initially attached in a stable location
        shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
        shard_a.intent.push_secondary(&mut scheduler, NodeId(3));

        // Set the preferred node to node 2, an equally high scoring node to its current location
        shard_a.preferred_node = Some(NodeId(2));

        fn make_schedule_context(shard_a: &TenantShard) -> ScheduleContext {
            let mut schedule_context = ScheduleContext::default();
            schedule_context.avoid(&shard_a.intent.all_pageservers());
            schedule_context
        }

        let schedule_context = make_schedule_context(&shard_a);
        let optimization_a_prepare = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_prepare,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::CreateSecondary(NodeId(2))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());

        // The first step of the optimisation should not have cleared the preferred node
        assert_eq!(shard_a.preferred_node, Some(NodeId(2)));

        let schedule_context = make_schedule_context(&shard_a);
        let optimization_a_migrate = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_migrate,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                    old_attached_node_id: NodeId(1),
                    new_attached_node_id: NodeId(2)
                })
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());

        // The cutover step of the optimisation should have cleared the preferred node
        assert_eq!(shard_a.preferred_node, None);

        let schedule_context = make_schedule_context(&shard_a);
        let optimization_a_cleanup = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_cleanup,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(1))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());

        shard_a.intent.clear(&mut scheduler);

        Ok(())
    }

    #[test]
    /// Check that multi-step migration works when moving to somewhere that is only better by
    /// 1 AffinityScore -- this ensures that we don't have a bug like the intermediate secondary
    /// counting toward the affinity score such that it prevents the rest of the migration from happening.
    fn optimize_attachment_marginal() -> anyhow::Result<()> {
        let nodes = make_test_nodes(2, &[]);
        let mut scheduler = Scheduler::new(nodes.values());

        // Multi-sharded tenant, we will craft a situation where affinity
        // scores differ only slightly
        let mut shards = make_test_tenant(PlacementPolicy::Attached(0), ShardCount::new(4), None);

        // 1 attached on node 1
        shards[0]
            .intent
            .set_attached(&mut scheduler, Some(NodeId(1)));
        // 3 attached on node 2
        shards[1]
            .intent
            .set_attached(&mut scheduler, Some(NodeId(2)));
        shards[2]
            .intent
            .set_attached(&mut scheduler, Some(NodeId(2)));
        shards[3]
            .intent
            .set_attached(&mut scheduler, Some(NodeId(2)));

        // The scheduler should figure out that we need to:
        // - Create a secondary for shard 3 on node 1
        // - Migrate shard 3 to node 1
        // - Remove shard 3's location on node 2

        fn make_schedule_context(shards: &Vec<TenantShard>) -> ScheduleContext {
            let mut schedule_context = ScheduleContext::default();
            for shard in shards {
                schedule_context.avoid(&shard.intent.all_pageservers());
            }
            schedule_context
        }

        let schedule_context = make_schedule_context(&shards);
        let optimization_a_prepare =
            shards[1].optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_prepare,
            Some(ScheduleOptimization {
                sequence: shards[1].sequence,
                action: ScheduleOptimizationAction::CreateSecondary(NodeId(1))
            })
        );
        shards[1].apply_optimization(&mut scheduler, optimization_a_prepare.unwrap());

        let schedule_context = make_schedule_context(&shards);
        let optimization_a_migrate =
            shards[1].optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_migrate,
            Some(ScheduleOptimization {
                sequence: shards[1].sequence,
                action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
                    old_attached_node_id: NodeId(2),
                    new_attached_node_id: NodeId(1)
                })
            })
        );
        shards[1].apply_optimization(&mut scheduler, optimization_a_migrate.unwrap());

        let schedule_context = make_schedule_context(&shards);
        let optimization_a_cleanup =
            shards[1].optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization_a_cleanup,
            Some(ScheduleOptimization {
                sequence: shards[1].sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
            })
        );
        shards[1].apply_optimization(&mut scheduler, optimization_a_cleanup.unwrap());

        // Everything should be stable now
        let schedule_context = make_schedule_context(&shards);
        assert_eq!(
            shards[0].optimize_attachment(&mut scheduler, &schedule_context),
            None
        );
        assert_eq!(
            shards[1].optimize_attachment(&mut scheduler, &schedule_context),
            None
        );
        assert_eq!(
            shards[2].optimize_attachment(&mut scheduler, &schedule_context),
            None
        );
        assert_eq!(
            shards[3].optimize_attachment(&mut scheduler, &schedule_context),
            None
        );

        for mut shard in shards {
            shard.intent.clear(&mut scheduler);
        }

        Ok(())
    }

    #[test]
    fn optimize_secondary() -> anyhow::Result<()> {
        let nodes = make_test_nodes(4, &[]);
        let mut scheduler = Scheduler::new(nodes.values());

        let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
        let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));

        // Initially: both nodes attached on shard 1, and both have secondary locations
        // on different nodes.
        shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
        shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
        shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
        shard_b.intent.push_secondary(&mut scheduler, NodeId(3));

        let mut schedule_context = ScheduleContext::default();
        schedule_context.avoid(&shard_a.intent.all_pageservers());
        schedule_context.avoid(&shard_b.intent.all_pageservers());

        let optimization_a = shard_a.optimize_secondary(&mut scheduler, &schedule_context);

        // Since there is a node with no locations available, the node with two locations for the
        // same tenant should generate an optimization to move one away
        assert_eq!(
            optimization_a,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
                    old_node_id: NodeId(3),
                    new_node_id: NodeId(4)
                })
            })
        );

        shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
        assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
        assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);

        shard_a.intent.clear(&mut scheduler);
        shard_b.intent.clear(&mut scheduler);

        Ok(())
    }

    /// Test how the optimisation code behaves with an extra secondary
    #[test]
    fn optimize_removes_secondary() -> anyhow::Result<()> {
        let az_a_tag = AvailabilityZone("az-a".to_string());
        let az_b_tag = AvailabilityZone("az-b".to_string());
        let mut nodes = make_test_nodes(
            4,
            &[
                az_a_tag.clone(),
                az_b_tag.clone(),
                az_a_tag.clone(),
                az_b_tag.clone(),
            ],
        );
        let mut scheduler = Scheduler::new(nodes.values());

        let mut schedule_context = ScheduleContext::default();

        let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
        shard_a.intent.preferred_az_id = Some(az_a_tag.clone());
        shard_a
            .schedule(&mut scheduler, &mut schedule_context)
            .unwrap();

        // Attached on node 1, secondary on node 2
        assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
        assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(2)]);

        // Initially optimiser is idle
        assert_eq!(
            shard_a.optimize_attachment(&mut scheduler, &schedule_context),
            None
        );
        assert_eq!(
            shard_a.optimize_secondary(&mut scheduler, &schedule_context),
            None
        );

        // A spare secondary in the home AZ: it should be removed -- this is the situation when we're midway through a graceful migration, after cutting over
        // to our new location
        shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
        let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(3))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization.unwrap());

        // A spare secondary in the non-home AZ, and one of them is offline
        shard_a.intent.push_secondary(&mut scheduler, NodeId(4));
        nodes
            .get_mut(&NodeId(4))
            .unwrap()
            .set_availability(NodeAvailability::Offline);
        scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
        let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(4))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization.unwrap());

        // A spare secondary when should have none
        shard_a.policy = PlacementPolicy::Attached(0);
        let optimization = shard_a.optimize_attachment(&mut scheduler, &schedule_context);
        assert_eq!(
            optimization,
            Some(ScheduleOptimization {
                sequence: shard_a.sequence,
                action: ScheduleOptimizationAction::RemoveSecondary(NodeId(2))
            })
        );
        shard_a.apply_optimization(&mut scheduler, optimization.unwrap());
        assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
        assert_eq!(shard_a.intent.get_secondary(), &vec![]);

        // Check that in secondary mode, we preserve the secondary in the preferred AZ
        let mut schedule_context = ScheduleContext::default(); // Fresh context, we're about to call schedule()
        shard_a.policy = PlacementPolicy::Secondary;
        shard_a
            .schedule(&mut scheduler, &mut schedule_context)
            .unwrap();
        assert_eq!(shard_a.intent.get_attached(), &None);
        assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
        assert_eq!(
            shard_a.optimize_attachment(&mut scheduler, &schedule_context),
            None
        );
        assert_eq!(
            shard_a.optimize_secondary(&mut scheduler, &schedule_context),
            None
        );

        shard_a.intent.clear(&mut scheduler);

        Ok(())
    }

    // Optimize til quiescent: this emulates what Service::optimize_all does, when
    // called repeatedly in the background.
    // Returns the applied optimizations
    fn optimize_til_idle(
        scheduler: &mut Scheduler,
        shards: &mut [TenantShard],
    ) -> Vec<ScheduleOptimization> {
        let mut loop_n = 0;
        let mut optimizations = Vec::default();
        loop {
            let mut schedule_context = ScheduleContext::default();
            let mut any_changed = false;

            for shard in shards.iter() {
                schedule_context.avoid(&shard.intent.all_pageservers());
            }

            for shard in shards.iter_mut() {
                let optimization = shard.optimize_attachment(scheduler, &schedule_context);
                tracing::info!(
                    "optimize_attachment({})={:?}",
                    shard.tenant_shard_id,
                    optimization
                );
                if let Some(optimization) = optimization {
                    // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
                    assert!(shard.maybe_optimizable(scheduler, &schedule_context));
                    optimizations.push(optimization.clone());
                    shard.apply_optimization(scheduler, optimization);
                    any_changed = true;
                    break;
                }

                let optimization = shard.optimize_secondary(scheduler, &schedule_context);
                tracing::info!(
                    "optimize_secondary({})={:?}",
                    shard.tenant_shard_id,
                    optimization
                );
                if let Some(optimization) = optimization {
                    // Check that maybe_optimizable wouldn't have wrongly claimed this optimization didn't exist
                    assert!(shard.maybe_optimizable(scheduler, &schedule_context));

                    optimizations.push(optimization.clone());
                    shard.apply_optimization(scheduler, optimization);
                    any_changed = true;
                    break;
                }
            }

            if !any_changed {
                break;
            }

            // Assert no infinite loop
            loop_n += 1;
            assert!(loop_n < 1000);
        }

        optimizations
    }

    /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
    /// that it converges.
    #[test]
    fn optimize_add_nodes() -> anyhow::Result<()> {
        let nodes = make_test_nodes(
            9,
            &[
                // Initial 6 nodes
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-c".to_string()),
                AvailabilityZone("az-c".to_string()),
                // Three we will add later
                AvailabilityZone("az-a".to_string()),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-c".to_string()),
            ],
        );

        // Only show the scheduler two nodes in each AZ to start with
        let mut scheduler = Scheduler::new([].iter());
        for i in 1..=6 {
            scheduler.node_upsert(nodes.get(&NodeId(i)).unwrap());
        }

        let mut shards = make_test_tenant(
            PlacementPolicy::Attached(1),
            ShardCount::new(4),
            Some(AvailabilityZone("az-a".to_string())),
        );
        let mut schedule_context = ScheduleContext::default();
        for shard in &mut shards {
            assert!(
                shard
                    .schedule(&mut scheduler, &mut schedule_context)
                    .is_ok()
            );
        }

        // Initial: attached locations land in the tenant's home AZ.
        assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 2);
        assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);

        // Initial: secondary locations in a remote AZ
        assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);

        // Add another three nodes: we should see the shards spread out when their optimize
        // methods are called
        scheduler.node_upsert(nodes.get(&NodeId(7)).unwrap());
        scheduler.node_upsert(nodes.get(&NodeId(8)).unwrap());
        scheduler.node_upsert(nodes.get(&NodeId(9)).unwrap());
        optimize_til_idle(&mut scheduler, &mut shards);

        // We expect one attached location was moved to the new node in the tenant's home AZ
        assert_eq!(scheduler.get_node_shard_count(NodeId(7)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(7)), 1);
        // The original node has one less attached shard
        assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(1)), 1);

        // One of the original nodes still has two attachments, since there are an odd number of nodes
        assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 2);

        // None of our secondaries moved, since we already had enough nodes for those to be
        // scheduled perfectly
        assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(3)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(4)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(5)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(5)), 0);
        assert_eq!(scheduler.get_node_shard_count(NodeId(6)), 1);
        assert_eq!(scheduler.get_node_attached_shard_count(NodeId(6)), 0);

        for shard in shards.iter_mut() {
            shard.intent.clear(&mut scheduler);
        }

        Ok(())
    }

    /// Test that initial shard scheduling is optimal. By optimal we mean
    /// that the optimizer cannot find a way to improve it.
    ///
    /// This test is an example of the scheduling issue described in
    /// https://github.com/neondatabase/neon/issues/8969
    #[test]
    fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
        use itertools::Itertools;

        let nodes = make_test_nodes(2, &[]);

        let mut scheduler = Scheduler::new([].iter());
        scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
        scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());

        let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
        let a_context = Rc::new(RefCell::new(ScheduleContext::default()));

        let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4), None);
        let b_context = Rc::new(RefCell::new(ScheduleContext::default()));

        let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
        let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));

        let schedule_order = a_shards_with_context.interleave(b_shards_with_context);

        for (shard, context) in schedule_order {
            let context = &mut *context.borrow_mut();
            shard.schedule(&mut scheduler, context).unwrap();
        }

        let applied_to_a = optimize_til_idle(&mut scheduler, &mut a);
        assert_eq!(applied_to_a, vec![]);

        let applied_to_b = optimize_til_idle(&mut scheduler, &mut b);
        assert_eq!(applied_to_b, vec![]);

        for shard in a.iter_mut().chain(b.iter_mut()) {
            shard.intent.clear(&mut scheduler);
        }

        Ok(())
    }

    #[test]
    fn random_az_shard_scheduling() -> anyhow::Result<()> {
        use rand::seq::SliceRandom;

        for seed in 0..50 {
            eprintln!("Running test with seed {seed}");
            let mut rng = StdRng::seed_from_u64(seed);

            let az_a_tag = AvailabilityZone("az-a".to_string());
            let az_b_tag = AvailabilityZone("az-b".to_string());
            let azs = [az_a_tag, az_b_tag];
            let nodes = make_test_nodes(4, &azs);
            let mut shards_per_az: HashMap<AvailabilityZone, u32> = HashMap::new();

            let mut scheduler = Scheduler::new([].iter());
            for node in nodes.values() {
                scheduler.node_upsert(node);
            }

            let mut shards = Vec::default();
            let mut contexts = Vec::default();
            let mut az_picker = azs.iter().cycle().cloned();
            for i in 0..100 {
                let az = az_picker.next().unwrap();
                let shard_count = i % 4 + 1;
                *shards_per_az.entry(az.clone()).or_default() += shard_count;

                let tenant_shards = make_test_tenant(
                    PlacementPolicy::Attached(1),
                    ShardCount::new(shard_count.try_into().unwrap()),
                    Some(az),
                );
                let context = Rc::new(RefCell::new(ScheduleContext::default()));

                contexts.push(context.clone());
                let with_ctx = tenant_shards
                    .into_iter()
                    .map(|shard| (shard, context.clone()));
                for shard_with_ctx in with_ctx {
                    shards.push(shard_with_ctx);
                }
            }

            shards.shuffle(&mut rng);

            #[derive(Default, Debug)]
            struct NodeStats {
                attachments: u32,
                secondaries: u32,
            }

            let mut node_stats: HashMap<NodeId, NodeStats> = HashMap::default();
            let mut attachments_in_wrong_az = 0;
            let mut secondaries_in_wrong_az = 0;

            for (shard, context) in &mut shards {
                let context = &mut *context.borrow_mut();
                shard.schedule(&mut scheduler, context).unwrap();

                let attached_node = shard.intent.get_attached().unwrap();
                let stats = node_stats.entry(attached_node).or_default();
                stats.attachments += 1;

                let secondary_node = *shard.intent.get_secondary().first().unwrap();
                let stats = node_stats.entry(secondary_node).or_default();
                stats.secondaries += 1;

                let attached_node_az = nodes
                    .get(&attached_node)
                    .unwrap()
                    .get_availability_zone_id();
                let secondary_node_az = nodes
                    .get(&secondary_node)
                    .unwrap()
                    .get_availability_zone_id();
                let preferred_az = shard.preferred_az().unwrap();

                if attached_node_az != preferred_az {
                    eprintln!(
                        "{} attachment was scheduled in AZ {} but preferred AZ {}",
                        shard.tenant_shard_id, attached_node_az, preferred_az
                    );
                    attachments_in_wrong_az += 1;
                }

                if secondary_node_az == preferred_az {
                    eprintln!(
                        "{} secondary was scheduled in AZ {} which matches preference",
                        shard.tenant_shard_id, attached_node_az
                    );
                    secondaries_in_wrong_az += 1;
                }
            }

            let mut violations = Vec::default();

            if attachments_in_wrong_az > 0 {
                violations.push(format!(
                    "{attachments_in_wrong_az} attachments scheduled to the incorrect AZ"
                ));
            }

            if secondaries_in_wrong_az > 0 {
                violations.push(format!(
                    "{secondaries_in_wrong_az} secondaries scheduled to the incorrect AZ"
                ));
            }

            eprintln!(
                "attachments_in_wrong_az={attachments_in_wrong_az} secondaries_in_wrong_az={secondaries_in_wrong_az}"
            );

            for (node_id, stats) in &node_stats {
                let node_az = nodes.get(node_id).unwrap().get_availability_zone_id();
                let ideal_attachment_load = shards_per_az.get(node_az).unwrap() / 2;
                let allowed_attachment_load =
                    (ideal_attachment_load - 1)..(ideal_attachment_load + 2);

                if !allowed_attachment_load.contains(&stats.attachments) {
                    violations.push(format!(
                        "Found {} attachments on node {}, but expected {}",
                        stats.attachments, node_id, ideal_attachment_load
                    ));
                }

                eprintln!(
                    "{}: attachments={} secondaries={} ideal_attachment_load={}",
                    node_id, stats.attachments, stats.secondaries, ideal_attachment_load
                );
            }

            assert!(violations.is_empty(), "{violations:?}");

            for (mut shard, _ctx) in shards {
                shard.intent.clear(&mut scheduler);
            }
        }
        Ok(())
    }

    /// Check how the shard's scheduling behaves when in PlacementPolicy::Secondary mode.
    #[test]
    fn tenant_secondary_scheduling() -> anyhow::Result<()> {
        let az_a = AvailabilityZone("az-a".to_string());
        let nodes = make_test_nodes(
            3,
            &[
                az_a.clone(),
                AvailabilityZone("az-b".to_string()),
                AvailabilityZone("az-c".to_string()),
            ],
        );

        let mut scheduler = Scheduler::new(nodes.values());
        let mut context = ScheduleContext::default();

        let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Secondary);
        tenant_shard.intent.preferred_az_id = Some(az_a.clone());
        tenant_shard
            .schedule(&mut scheduler, &mut context)
            .expect("we have enough nodes, scheduling should work");
        assert_eq!(tenant_shard.intent.secondary.len(), 1);
        assert!(tenant_shard.intent.attached.is_none());

        // Should have scheduled into the preferred AZ
        assert_eq!(
            scheduler
                .get_node_az(&tenant_shard.intent.secondary[0])
                .as_ref(),
            tenant_shard.preferred_az()
        );

        // Optimizer should agree
        assert_eq!(
            tenant_shard.optimize_attachment(&mut scheduler, &context),
            None
        );
        assert_eq!(
            tenant_shard.optimize_secondary(&mut scheduler, &context),
            None
        );

        // Switch to PlacementPolicy::Attached
        tenant_shard.policy = PlacementPolicy::Attached(1);
        tenant_shard
            .schedule(&mut scheduler, &mut context)
            .expect("we have enough nodes, scheduling should work");
        assert_eq!(tenant_shard.intent.secondary.len(), 1);
        assert!(tenant_shard.intent.attached.is_some());
        // Secondary should now be in non-preferred AZ
        assert_ne!(
            scheduler
                .get_node_az(&tenant_shard.intent.secondary[0])
                .as_ref(),
            tenant_shard.preferred_az()
        );
        // Attached should be in preferred AZ
        assert_eq!(
            scheduler
                .get_node_az(&tenant_shard.intent.attached.unwrap())
                .as_ref(),
            tenant_shard.preferred_az()
        );

        // Optimizer should agree
        assert_eq!(
            tenant_shard.optimize_attachment(&mut scheduler, &context),
            None
        );
        assert_eq!(
            tenant_shard.optimize_secondary(&mut scheduler, &context),
            None
        );

        // Switch back to PlacementPolicy::Secondary
        tenant_shard.policy = PlacementPolicy::Secondary;
        tenant_shard
            .schedule(&mut scheduler, &mut context)
            .expect("we have enough nodes, scheduling should work");
        assert_eq!(tenant_shard.intent.secondary.len(), 1);
        assert!(tenant_shard.intent.attached.is_none());
        // When we picked a location to keep, we should have kept the one in the preferred AZ
        assert_eq!(
            scheduler
                .get_node_az(&tenant_shard.intent.secondary[0])
                .as_ref(),
            tenant_shard.preferred_az()
        );

        // Optimizer should agree
        assert_eq!(
            tenant_shard.optimize_attachment(&mut scheduler, &context),
            None
        );
        assert_eq!(
            tenant_shard.optimize_secondary(&mut scheduler, &context),
            None
        );

        tenant_shard.intent.clear(&mut scheduler);

        Ok(())
    }
}
